// 
//   Copyright © 2009 Jiří Zárevúcky <zarevucky.jiri@gmail.com>
//  
//   This program is free software: you can redistribute it and/or modify
//   it under the terms of the GNU Affero General Public License as
//   published by the Free Software Foundation, either version 3 of the
//   License, or (at your option) any later version.
//  
//   This program is distributed in the hope that it will be useful,
//   but WITHOUT ANY WARRANTY; without even the implied warranty of
//   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
//   GNU Affero General Public License for more details.
//  
//   You should have received a copy of the GNU Affero General Public License
//   along with this program.  If not, see <http://www.gnu.org/licenses/>.
//  
//  


using System;
using System.IO;
using System.Threading;
using System.Collections.Generic;

using Galaxium.Protocol.Xmpp.Library.Xml;
using Galaxium.Protocol.Xmpp.Library.Core;

namespace Galaxium.Protocol.Xmpp.Library.Streams
{
	internal class InBandBytestream: Stream, IChunkedStream
	{
		private Client _client;
		private InBandBytestreamManager _manager;
		private bool _closed;
		private string _sid;
		private JabberID _uid;
		
		private ushort _seq_in;
		private ushort _seq_out;
		
		private ushort _block_size;
		
		private bool _use_messages;
		
		private int _partial;
		private Queue<byte[]> _buffer = new Queue<byte[]> ();
		
		private ManualResetEvent _read_waiter = new ManualResetEvent (false);
		
		public override bool CanRead {
			get { return true; }
		}

		public override bool CanSeek {
			get { return false; }
		}

		public override bool CanWrite {
			get { return true; }
		}

		public override long Length {
			get { throw new NotSupportedException (); }
		}

		public override long Position {
			get { throw new NotSupportedException (); }
			set { throw new NotSupportedException (); }
		}
		
		public int ChunkSize {
			get { return _block_size; }
		}

		public InBandBytestream (Client client, InBandBytestreamManager manager,
		                         JabberID uid, string sid, ushort block_size, bool use_messages)
		{
			_client = client;
			_manager = manager;
			_sid = sid;
			_uid = uid;
			_block_size = block_size;
			_use_messages = use_messages;
		}
		
		public override void Flush ()
		{
		}

		public override long Seek (long offset, SeekOrigin origin)
		{
			throw new NotSupportedException ();
		}
		
		public override void SetLength (long value)
		{
			throw new NotSupportedException ();
		}
		
		internal void HandleClosed ()
		{
			_closed = true;
			Close ();
		}
		
		internal void HandleData (int seq, byte[] data)
		{
			if (seq != _seq_in)
				throw new InvalidDataException ();
		
			_seq_in ++;
			
			lock (_buffer) {
				_buffer.Enqueue (data);
				_read_waiter.Set ();
			}
		}
		
		public override void Close ()
		{
			base.Close ();
			
			if (_closed) return;
			_manager.CloseStream (_uid, _sid);
			_closed = true;
			_read_waiter.Close ();
		}
	
		public override int Read (byte[] buffer, int offset, int count)
		{
			if (_closed)
				throw new ObjectDisposedException ("InBandBytestream");
			
			int orig_count = count;
			
			_read_waiter.WaitOne ();			
			lock (_buffer) {
				while (_buffer.Count != 0) {
					var arr = _buffer.Peek ();
					var cnt = Math.Min (count, arr.Length - _partial);
					Array.Copy (arr, _partial, buffer, offset, cnt);
					_partial = (count < (arr.Length - _partial)) ? 0 : _partial + count;
					offset += cnt;
					count -= cnt;
					if (_partial == 0)
						_buffer.Dequeue ();
					else break;
				}
				if (_buffer.Count == 0 && !_closed)
					_read_waiter.Reset ();
			}
			
			return orig_count - count;
		}
		
		public override void Write (byte[] buffer, int offset, int count)
		{	
			if (_closed)
				throw new ObjectDisposedException ("InBandBytestream");
			if (offset + count > buffer.Length)
				throw new ArgumentException ();
			
			if (_use_messages)
				SendAsMessages (buffer, offset, count);
			else
				SendAsIQs (buffer, offset, count);
		}
		
		private void SendAsMessages (byte[] buffer, int offset, int count)
		{
			while (count > 0 && !_closed) {
				var data = new Element ("data", Namespaces.InBandBytestream);
				data ["sid"] = _sid;
				data ["seq"] = _seq_out.ToString ();
				data.Text = Convert.ToBase64String (buffer, offset,
				                                    Math.Min (_block_size, count),
				                                    Base64FormattingOptions.InsertLineBreaks);
				
				var message = new XmlMessage ();
				message.To = _uid;
				message.ID = "ibb::" + _sid + "::" +
					String.Format ("{0:x4}", _seq_out ++);
					
				message.AppendChild (data);
				_client.Send (message);
				
				offset += _block_size;
				count -= _block_size;
				
				Thread.Sleep (200);
			}
		}
		
		private void SendAsIQs (byte[] buffer, int offset, int count)
		{
			var iq = new Iq (IqType.Set, "data", Namespaces.InBandBytestream);
			iq.To = _uid;
			iq.Query ["sid"] = _sid;
			
			bool retried = false;
	
			while (count > 0 && !_closed) {
				iq.ID = null;
				iq.Query ["seq"] = (_seq_out ++).ToString ();
				iq.Query.Text =
					Convert.ToBase64String (buffer, offset,
					                        Math.Min (_block_size, count),
					                        Base64FormattingOptions.InsertLineBreaks);
				
				try {
					_client.SendQuery (iq, -1);
				}
				catch (QueryException e) {
					if (!retried && e.Condition == "bad-request") {				
						// probably network-level failure or some freak convertion error
						// should never happen anyway, unless the other party is buggy
						retried = true;
						continue;
					}

					Close ();
					throw;
				}
				
				offset += _block_size;
				count -= _block_size;
				retried = false;
			}
		}
		
	//	private void SendAsMessages (byte[] buffer, int offset, int count)
	//	{
	//	}
	}
}
